package com.shujia.flink.kafka;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Demo02KafkaSink {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("master:9092,node1:9092,node2:9092")
                .setTopics("flink_t1")
                .setGroupId("flink_grp_01")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStreamSource<String> kafkaDS = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkaSource");

        DataStream<String> wordCntDS = kafkaDS.flatMap((line, out) -> {
                    for (String word : line.split(",")) {
                        out.collect(word);
                    }
                }, Types.STRING)
                .map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(kv -> kv.f0, Types.STRING)
                .sum(1)
                .map(t2 -> t2.f0 + "," + t2.f1, Types.STRING);


        KafkaSink<String> kafkaSink = KafkaSink
                .<String>builder()
                .setBootstrapServers("master:9092,node1:9092,node2:9092")
                .setRecordSerializer(
                        KafkaRecordSerializationSchema
                                .builder()
                                .setTopic("flink_word_cnt_01")
                                .setValueSerializationSchema(new SimpleStringSchema())
                                .build()
                )
                // 写入Kafka的语义：AT_LEAST_ONCE、EXACTLY_ONCE
                .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                .build();

        wordCntDS.sinkTo(kafkaSink);

        env.execute();

    }
}
